package Key_Value类型

import org.apache.spark.{SparkConf, SparkContext}

object combineByKey {
    /*
     参数：(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)
        1.	作用：对相同K，把V合并成一个集合。
        2.	参数描述：
        （1）createCombiner: combineByKey() 会遍历分区中的所有元素，因此每个元素的键要么还没有遇到过，要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
        （2）mergeValue: 如果这是一个在处理当前分区之前已经遇到的键，它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
        （3）mergeCombiners: 由于每个分区都是独立处理的， 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器， 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
    */

    def main(args: Array[String]): Unit = {

        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

        val sc = new SparkContext(config)

        val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)

        //将相同key对应的值相加，同时记录该key出现的次数，放入一个二元组
        val combine = input.combineByKey((_, 1),
            (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
            (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

        combine.collect().foreach(println)

        //计算平均值
        val result = combine.map { case (key, value) => (key, value._1 / value._2.toDouble) }

        result.collect().foreach(println)

        //做WordCount
        input.combineByKey(
            (x => x),
            (x: Int, y/*:Int*/) => x + y,
            (x: Int, y: Int) => x + y
        )
                .collect().foreach(println)
    }

}
